> Production Agent Deployment
Budding
planted Jan 8, 2026tended Jan 8, 2026
#ai-agents#production#deployment#scaling#operations
Production Agent Deployment
πΏ Budding note β taking agents from prototype to production.
Production Readiness
Before deploying agents to production:
Requirements:
- Error handling and recovery
- Logging and monitoring
- Rate limiting and quotas
- Security hardening
- Cost management
- Performance optimization
Related: AI Agents Fundamentals and Agent Security Considerations
Architecture Patterns
1. API-Based Deployment
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
class AgentRequest(BaseModel):
task: str
user_id: str
class AgentResponse(BaseModel):
result: str
tokens_used: int
execution_time: float
@app.post("/agent/run", response_model=AgentResponse)
async def run_agent(request: AgentRequest):
"""Run agent via API"""
try:
start = time.time()
# Rate limit check
if not rate_limiter.check(request.user_id):
raise HTTPException(429, "Rate limit exceeded")
# Execute agent
result = await agent.process(request.task)
return AgentResponse(
result=result.text,
tokens_used=result.usage.total_tokens,
execution_time=time.time() - start
)
except Exception as e:
logger.error(f"Agent error: {e}", exc_info=True)
raise HTTPException(500, str(e))
2. Queue-Based Processing
from celery import Celery
from redis import Redis
app = Celery('agents', broker='redis://localhost:6379/0')
redis_client = Redis()
@app.task(bind=True, max_retries=3)
def process_agent_task(self, task_id: str, task_data: dict):
"""Background agent processing"""
try:
# Update status
redis_client.setex(f"task:{task_id}:status", 3600, "processing")
# Run agent
result = agent.process(task_data["query"])
# Store result
redis_client.setex(
f"task:{task_id}:result",
3600,
json.dumps(result)
)
return {"status": "completed", "result": result}
except Exception as e:
# Retry with backoff
self.retry(exc=e, countdown=2 ** self.request.retries)
3. Serverless Deployment
# AWS Lambda handler
import json
def lambda_handler(event, context):
"""Serverless agent execution"""
try:
task = json.loads(event['body'])
# Cold start optimization: reuse client
global agent_client
if 'agent_client' not in globals():
agent_client = initialize_agent()
result = agent_client.process(task['query'])
return {
'statusCode': 200,
'body': json.dumps({
'result': result,
'request_id': context.request_id
})
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
Cost Management
Token Budgets
class CostManager:
"""Manage API costs"""
def __init__(self, budget_per_user: float):
self.budget = budget_per_user
self.usage = defaultdict(float)
def check_budget(self, user_id: str, estimated_cost: float) -> bool:
"""Check if user has budget"""
current_usage = self.usage[user_id]
return (current_usage + estimated_cost) <= self.budget
def record_usage(self, user_id: str, actual_cost: float):
"""Track spending"""
self.usage[user_id] += actual_cost
# Alert if approaching limit
if self.usage[user_id] > self.budget * 0.9:
self.alert_approaching_limit(user_id)
# Usage
cost_mgr = CostManager(budget_per_user=10.0) # $10 per user
if not cost_mgr.check_budget(user_id, estimated_cost=0.50):
raise BudgetExceeded("Monthly budget exceeded")
result = agent.process(task)
cost_mgr.record_usage(user_id, result.cost)
Caching Strategies
import hashlib
from functools import lru_cache
class ResponseCache:
"""Cache agent responses"""
def __init__(self, redis_client):
self.redis = redis_client
self.ttl = 3600 # 1 hour
def get_cache_key(self, query: str, context: dict = None) -> str:
"""Generate cache key"""
data = f"{query}:{json.dumps(context or {}, sort_keys=True)}"
return hashlib.md5(data.encode()).hexdigest()
async def get_or_compute(self, query: str, context: dict, compute_fn):
"""Get from cache or compute"""
cache_key = self.get_cache_key(query, context)
# Check cache
cached = self.redis.get(f"agent:response:{cache_key}")
if cached:
return json.loads(cached)
# Compute
result = await compute_fn(query, context)
# Cache
self.redis.setex(
f"agent:response:{cache_key}",
self.ttl,
json.dumps(result)
)
return result
Monitoring
Metrics Collection
from prometheus_client import Counter, Histogram, Gauge
# Define metrics
agent_requests = Counter(
'agent_requests_total',
'Total agent requests',
['agent_type', 'status']
)
agent_latency = Histogram(
'agent_latency_seconds',
'Agent response time',
['agent_type']
)
agent_tokens = Histogram(
'agent_tokens_used',
'Tokens used per request',
['agent_type']
)
active_agents = Gauge(
'agent_active_count',
'Number of currently running agents'
)
# Use in agent
async def monitored_agent_call(task: str):
"""Agent call with metrics"""
active_agents.inc()
start = time.time()
try:
result = await agent.process(task)
# Record success metrics
agent_requests.labels(
agent_type='research',
status='success'
).inc()
agent_latency.labels(agent_type='research').observe(
time.time() - start
)
agent_tokens.labels(agent_type='research').observe(
result.tokens_used
)
return result
except Exception as e:
agent_requests.labels(
agent_type='research',
status='error'
).inc()
raise
finally:
active_agents.dec()
Structured Logging
import structlog
logger = structlog.get_logger()
def log_agent_execution(
agent_id: str,
task: str,
result: dict,
metadata: dict
):
"""Structured logging for agents"""
logger.info(
"agent_execution",
agent_id=agent_id,
task_preview=task[:100],
status=result.get("status"),
tokens_used=result.get("tokens"),
latency_ms=metadata.get("latency") * 1000,
user_id=metadata.get("user_id"),
request_id=metadata.get("request_id")
)
Error Handling
Retry Logic
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
async def resilient_agent_call(task: str):
"""Agent call with automatic retry"""
try:
return await agent.process(task)
except RateLimitError:
# Don't retry rate limits
raise
except Exception as e:
logger.warning(f"Agent call failed, will retry: {e}")
raise
Circuit Breaker
class CircuitBreaker:
"""Prevent cascading failures"""
def __init__(self, failure_threshold: int = 5, timeout: int = 60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.timeout = timeout
self.last_failure_time = None
self.state = "closed" # closed, open, half_open
async def call(self, func, *args, **kwargs):
"""Execute with circuit breaker"""
if self.state == "open":
if time.time() - self.last_failure_time > self.timeout:
self.state = "half_open"
else:
raise CircuitOpen("Circuit breaker is open")
try:
result = await func(*args, **kwargs)
# Success - reset
if self.state == "half_open":
self.state = "closed"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
raise
Scaling Strategies
Horizontal Scaling
# Load balancer distributes across agent instances
class AgentPool:
"""Pool of agent instances"""
def __init__(self, num_workers: int):
self.workers = [
AgentWorker(id=i) for i in range(num_workers)
]
self.current_worker = 0
async def process(self, task: str):
"""Round-robin distribution"""
worker = self.workers[self.current_worker]
self.current_worker = (self.current_worker + 1) % len(self.workers)
return await worker.process(task)
Auto-Scaling
class AutoScaler:
"""Automatically scale agent workers"""
def __init__(self, min_workers: int = 1, max_workers: int = 10):
self.min_workers = min_workers
self.max_workers = max_workers
self.current_workers = min_workers
async def check_and_scale(self, metrics: dict):
"""Scale based on metrics"""
queue_length = metrics["queue_length"]
avg_latency = metrics["avg_latency"]
# Scale up if queue is growing
if queue_length > 100 and self.current_workers < self.max_workers:
await self.scale_up()
# Scale down if idle
elif queue_length < 10 and self.current_workers > self.min_workers:
await self.scale_down()
async def scale_up(self):
"""Add worker"""
self.current_workers += 1
# Start new worker process/container
async def scale_down(self):
"""Remove worker"""
self.current_workers -= 1
# Stop idle worker
Health Checks
@app.get("/health")
async def health_check():
"""Service health endpoint"""
health = {
"status": "healthy",
"checks": {}
}
# Check LLM API
try:
await client.messages.create(
model="claude-sonnet-4-5-20250929",
max_tokens=10,
messages=[{"role": "user", "content": "test"}]
)
health["checks"]["llm_api"] = "ok"
except:
health["checks"]["llm_api"] = "failed"
health["status"] = "unhealthy"
# Check database
try:
db.ping()
health["checks"]["database"] = "ok"
except:
health["checks"]["database"] = "failed"
health["status"] = "unhealthy"
# Check Redis
try:
redis_client.ping()
health["checks"]["redis"] = "ok"
except:
health["checks"]["redis"] = "failed"
health["status"] = "unhealthy"
return health
Deployment Checklist
## Pre-Deployment
- [ ] Load testing completed
- [ ] Error handling tested
- [ ] Logging configured
- [ ] Monitoring dashboards set up
- [ ] Rate limits configured
- [ ] Security review passed
- [ ] Cost budgets set
- [ ] Backup and recovery tested
- [ ] Documentation updated
- [ ] Runbook created
## Post-Deployment
- [ ] Monitor error rates
- [ ] Track latency metrics
- [ ] Review costs daily
- [ ] Check logs for issues
- [ ] Test rollback procedure
- [ ] Collect user feedback
Related: Agent Evaluation & Testing
Connection Points
Prerequisites:
- AI Agents Fundamentals β Agent basics
- Agent Security Considerations β Security in production
Related:
- Agent Frameworks Comparison β Framework deployment
- Claude Agent Patterns β Claude optimization
- Agent Memory Systems β Production memory
Testing:
- Agent Evaluation & Testing β Pre-production testing
>> referenced by (10)
Agent Evaluation and Testing
..._rate_24h"] > 0.05: self.alert("Error rate above 5%") ``` Related: [[Production Agent Deployment]] Connection Points Prerequisites: - [[AI Agents Fundamentals]] β Agent...
Agent Frameworks Comparison
...self.spent += self.cost_per_call return response `` Related: [[Production Agent Deployment]] Migration Patterns From LangChain to LangGraph ``python Before: La...
Agent Memory Systems
...t) await self.vector_store.upsert_async(vector, metadata) ``` Related: [[Production Agent Deployment]] Connection Points Prerequisites: - [[AI Agents Fundamentals]] β Agent...
Agent Security Considerations
...ecurity testing completed - [ ] Incident response plan documented ``` Related: [[Production Agent Deployment]] Connection Points Prerequisites: - [[AI Agents Fundamentals]] β Agent...
AI Agents
...[Agent Security Considerations]] πΏ β Prompt injection, tool safety, auditing - [[Production Agent Deployment]] πΏ β Scaling, monitoring, and operations Production Considerations Ope...
AI Agents Fundamentals
...ts(code) return code `` Related: [[Building Agents with LangChain]], [[Production Agent Deployment]] Research Assistant ``python class ResearchAgent: async def research_...
Building Agents with LangChain
...β Tool patterns - [[Agent Memory Systems]] β LangChain memory Advanced: - [[Production Agent Deployment]] β Deploying LangChain agents - [[Agent Security Considerations]] β LangChain se...
Claude Agent Patterns
...ude agents - [[Agent Security Considerations]] β Claude safety Advanced: - [[Production Agent Deployment]] β Scaling Claude agents - [[Building Agents with LangChain]] β LangChain + Clau...
Multi-Agent Systems
...β CrewAI, LangGraph - [[Agent Evaluation & Testing]] β Testing collaborations - [[Production Agent Deployment]] β Scaling multiple agents
Tool Use and Function Calling
...safety - [[Building Agents with LangChain]] β LangChain tools Advanced: - [[Production Agent Deployment]] β Production tool management - [[Agent Evaluation & Testing]] β Testing tool re...